本文介绍如何使用 DataX 工具将 MySQL 数据库中的数据同步到表格存储(Tablestore)。
背景信息
DataX 是阿里云的离线数据同步工具,它通过 JDBC 连接 MySQL 数据库,发送 SQL 语句获取数据缓存在本地 JVM中,然后通过 Writer 线程将数据写入到表格存储的数据表中。如果想了解更多关于DataX的介绍,请参见DataX。
准备工作
准备需要同步的 MySQL 信息,包括用户名、密码、JDBC 连接信息等。
开通表格存储服务,创建实例和数据表,用于存放同步的数据。具体操作,请参见开通服务并创建实例和创建数据表。
重要创建数据表时,建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。本文使用的MySQL和表格存储数据表样例,请参见附录-样例数据。
获取表格存储的服务地址(Endpoint)和实例名称。
登录表格存储控制台。
在页面上方,选择资源组和地域。
在概览页面,单击实例别名或在操作列单击实例管理。
在实例详情页签,查看实例的名称和服务地址。
获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey。
重要出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限(
AliyunOTSFullAccess
)并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储。
操作步骤
本文操作使用的服务器为云服务器ECS,操作系统为Alibaba Cloud Linux 3.2104 LTS 64位和Ubuntu 22.04 64位。
一、安装依赖
安装Python(Python 2和Python 3都可以)。
ECS的Alibaba Cloud Linux和Ubuntu系统已自带Python 3,如果您使用的是其他服务器,请自行进行安装。
安装JDK(1.8及以上,推荐1.8)。
本文介绍在ECS的Alibaba Cloud Linux和Ubuntu系统中安装JDK 1.8的方法,如果您使用的是其他服务器,请自行进行安装。
Alibaba Cloud Linux
yum -y install java-1.8.0-openjdk-devel.x86_64
Ubuntu
apt update && apt upgrade apt install openjdk-8-jdk
二、安装DataX
下载DataX工具包。
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
解压工具包。
tar -zxvf datax.tar.gz
如果有自行编译DataX的需要,请参见DataX安装指引。
三、编写配置文件
进入DataX的
bin
目录。cd datax/bin
创建配置文件。如果您使用的是
vim
,请自行替换命令。vi mysql_to_ots.json
配置文件参考如下,MySQL读取数据有两种模式,请按需选择。
querySQL模式:通过SQL语句查询需要导出的数据,支持联表查询。
table模式:通过指定表名、列名和where条件确定需要导出的数据,DataX会根据上述信息自动拼接SQL语句并抽取数据。该模式还可以通过数据分片实现并发同步。
querySQL模式
请根据您的同步信息和需求替换配置文件内的参数信息。
{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "mysql_username", "password": "mysql_password", "connection": [ { "querySql": [ "select * from table_name" ], "jdbcUrl": [ "jdbc:mysql://server_ip:3306/database_name?useSSL=false" ] } ] } }, "writer": { "name": "otswriter", "parameter": { "endpoint":"endpoint", "accessId":"accesskey_id", "accessKey":"accesskey_secret", "instanceName":"instance_name", "table":"table_name", "primaryKey":[ {"name":"order_id", "type":"string"} ], "column":[ {"name":"user_id","type":"string"}, {"name":"sku_id","type":"string"}, {"name":"price","type":"double"}, {"name":"num","type":"int"}, {"name":"total_price","type":"double"}, {"name":"order_status","type":"string"}, {"name":"create_time","type":"string"}, {"name":"modified_time","type":"string"} ], "writeMode":"UpdateRow" } } } ] } }
MySQL Reader需要替换的参数说明如下:
参数名称
说明
username
MySQL 连接用户名。
password
MySQL 连接用户密码。
querySql
查询SQL,用于确定需要同步的数据范围。
jdbcUrl
JDBC连接信息。
关于MySQL Reader的更多信息,请参见MySQL Reader介绍。
表格存储Writer需要替换的参数说明如下:
参数名称
说明
endpoint
实例服务地址。
accessId
阿里云账号或RAM用户的AccessKey ID。
accessKey
阿里云账号或RAM用户的AccessKey Secret。
instanceName
实例名称。
table
目标表名称。
primaryKey
目标表的主键列列表。
column
需要写入的属性列。
关于表格存储Writer的更多信息,请参见表格存储Writer介绍。
table模式
请根据您的同步信息和需求替换配置文件内的参数信息。
{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "mysql_username", "password": "mysql_password", "column": [ "order_id", "user_id" , "sku_id" , "price", "num", "total_price", "order_status", "create_time", "modified_time" ], "splitPk": "num", "connection": [ { "table": [ "table_name" ], "jdbcUrl": [ "jdbc:mysql://server_ip:3306/database_name?useSSL=false" ] } ] } }, "writer": { "name": "otswriter", "parameter": { "endpoint":"endpoint", "accessId":"accesskey_id", "accessKey":"accesskey_secret", "instanceName":"instance_name", "table":"table_name", "primaryKey":[ {"name":"order_id", "type":"string"} ], "column":[ {"name":"user_id","type":"string"}, {"name":"sku_id","type":"string"}, {"name":"price","type":"double"}, {"name":"num","type":"int"}, {"name":"total_price","type":"double"}, {"name":"order_status","type":"string"}, {"name":"create_time","type":"string"}, {"name":"modified_time","type":"string"} ], "writeMode":"UpdateRow" } } } ] } }
MySQL Reader需要替换的参数说明如下:
参数名称
说明
username
MySQL 连接用户名。
password
MySQL 连接用户密码。
column
需要同步的源表字段。
splitPk
数据分片的字段,只在channel数不为1时按指定的字段进行数据分片查询。
说明DataX数据分片支持整型和字符串类型的字段,仅推荐按整型字段进行数据分片。
table
MySQL源表表名。
jdbcUrl
JDBC连接信息。
关于MySQL Reader的更多信息,请参见MySQL Reader介绍。
表格存储Writer需要替换的参数说明如下:
参数名称
说明
endpoint
实例服务地址。
accessId
阿里云账号或RAM用户的AccessKey ID。
accessKey
阿里云账号或RAM用户的AccessKey Secret。
instanceName
实例名称。
table
目标表名称。
primaryKey
目标表的主键列列表。
column
需要写入的属性列。
关于表格存储Writer的更多信息,请参见表格存储Writer介绍。
四、执行同步命令
执行以下命令开始同步数据。
python3 datax.py mysql_to_ots.json
同步任务结束后,将打印整体运行情况。
2025-02-10 18:02:17.355 [job-0] INFO JobContainer - 任务启动时刻 : 2025-02-10 18:02:06 任务结束时刻 : 2025-02-10 18:02:17 任务总计耗时 : 11s 任务平均流量 : 871.56KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
验证同步结果。
您可以前往表格存储控制台查看已导入的数据。
相关文档
表格存储数据列支持的数据类型,请参见数据类型。
附录-样例数据
MySQL源表
MySQL数据样例源表建表语句如下:
create table orders (
order_id varchar(50) primary key comment '订单ID',
user_id varchar(10) not null comment '用户ID',
sku_id varchar(10) not null comment '商品ID',
price decimal(12, 2) not null comment '商品购买单价',
num int not null comment '商品购买数量',
total_price decimal(12, 2) not null comment '订单总价',
order_status varchar(2) not null comment '订单状态',
create_time timestamp not null default current_timestamp comment '订单创建时间',
modified_time timestamp not null default current_timestamp on update current_timestamp comment '最后修改时间'
);
表格存储目标表
由于表格存储是schema-free的,创建目标表时仅需指定主键 order_id
即可。目标表基本详情如下: